package cn.doitedu.rtmk.demo.demo6;

import groovy.lang.GroovyClassLoader;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class CoreEngineFunction extends KeyedBroadcastProcessFunction<Integer, EventBean, RuleResourceBean, RtmkMessage> {

    private final ConcurrentHashMap<String, RuleModelCalculator> calculatorMap = new ConcurrentHashMap<>();

    @Override
    public void processElement(EventBean eventBean, KeyedBroadcastProcessFunction<Integer, EventBean, RuleResourceBean, RtmkMessage>.ReadOnlyContext ctx, Collector<RtmkMessage> out) throws Exception {

        log.warn("收到一个用户事件,此时 运算机容器size : {}" ,calculatorMap.size());

        // 判断，如果 装运算机容器map还是空，则可能是新启动或者故障后重启
        // 要做一次运算机的恢复
        ReadOnlyBroadcastState<String, RuleResourceBean> broadcastState = ctx.getBroadcastState(StateDescriptors.RULE_RES_STATE_DESC);
        if(calculatorMap.size()==0){
            GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
            for (Map.Entry<String, RuleResourceBean> entry : broadcastState.immutableEntries()) {
                String ruleId = entry.getKey();
                RuleResourceBean resourceBean = entry.getValue();
                // 根据规则资源，重建运算机
                RuleModelCalculator calculator = ParamJsonUtil.generateInitCalculatorByGroovyCode(groovyClassLoader, resourceBean, ruleId, getRuntimeContext());
                // 放入容器
                calculatorMap.put(ruleId,calculator);
                log.warn("恢复规则运算机,所属规则为:{}",ruleId);
            }
        }

        // 遍历 规则运算机容器
        for (Map.Entry<String, RuleModelCalculator> entry : calculatorMap.entrySet()) {
            RuleModelCalculator calculator = entry.getValue();
            // 调用运算机，处理当前的输入事件
            calculator.calc(eventBean,out);
        }
    }

    @Override
    public void processBroadcastElement(RuleResourceBean ruleResourceBean, KeyedBroadcastProcessFunction<Integer, EventBean, RuleResourceBean, RtmkMessage>.Context ctx, Collector<RtmkMessage> out) throws Exception {

        BroadcastState<String, RuleResourceBean> broadcastState = ctx.getBroadcastState(StateDescriptors.RULE_RES_STATE_DESC);
        // 将规则元数据bean,放入广播状态
        broadcastState.put(ruleResourceBean.getRule_id(),ruleResourceBean);

        // 判断，如果 装运算机容器map还是空，则可能是新启动或者故障后重启
        // 要做一次运算机的恢复
        if(calculatorMap.size()==0){
            GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
            for (Map.Entry<String, RuleResourceBean> entry : broadcastState.entries()) {
                String ruleId = entry.getKey();
                RuleResourceBean resourceBean = entry.getValue();
                // 根据规则资源，重建运算机
                RuleModelCalculator calculator = ParamJsonUtil.generateInitCalculatorByGroovyCode(groovyClassLoader, resourceBean, ruleId, getRuntimeContext());
                // 放入容器
                calculatorMap.put(ruleId,calculator);
                log.warn("恢复规则运算机,所属规则为:{}",ruleId);
            }
        }


        /**
         * 根据cdc新监听到的规则信息操作数据中的不同操作类型，进行不同处理
         */
        String rule_id = ruleResourceBean.getRule_id();
        switch (ruleResourceBean.getOperateType()) {
            case 2:
            case 0:
                // 根据规则资源，重建运算机
                GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
                RuleModelCalculator calculator = ParamJsonUtil.generateInitCalculatorByGroovyCode(groovyClassLoader, ruleResourceBean, rule_id, getRuntimeContext());
                // 放入容器
                calculatorMap.put(rule_id,calculator);
                log.warn("收到新注入的规则资源数据,并构建运算机存入容器,所属规则为:{}",rule_id);
                break;
            case 3:
                calculatorMap.remove(rule_id);
                log.warn("下线一个规则: {}",rule_id);
                break;
            default:
        }

    }


}
